feature: Tracking and inspecting historical chunks#28
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds first-class support for tracking and inspecting historical “chunk runs” (bounded work units like ledger backfill/repair ranges) end-to-end: protobuf API surface, control-plane RPC implementations, persistent storage, a Go component-side reporter helper, and operator-facing flowctl chunks CLI commands.
Changes:
- Added protobuf types/enums and RPCs for chunk-run upsert/get/list, plus regenerated Go protobuf + gRPC bindings.
- Implemented chunk-run storage across BoltDB + in-memory backends, and wired new RPC handlers into the control plane.
- Introduced a Go reporter helper for components and a new
flowctl chunksCLI command group, with accompanying docs and tests.
Reviewed changes
Copilot reviewed 12 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/control_plane.proto | Defines ChunkRun, ChunkStatus, FailureClass, and chunk-run RPCs. |
| proto/control_plane.pb.go | Regenerated Go protobuf bindings for the new API surface. |
| proto/control_plane_grpc.pb.go | Regenerated Go gRPC bindings, adding chunk-run client/server methods. |
| pkg/component/reporter.go | Adds a component-side reporter helper to register/heartbeat and upsert chunk status. |
| pkg/component/reporter_test.go | Unit tests for env parsing and noop behavior when disabled. |
| internal/storage/memory.go | Adds in-memory chunk-run CRUD + listing with filters/limits. |
| internal/storage/interface.go | Extends storage interface with chunk-run types/methods + not-found error type. |
| internal/storage/bolt.go | Adds BoltDB bucket + chunk-run persistence and list/get/delete operations. |
| internal/runner/pipeline_runner.go | Injects FLOWCTL_RUN_ID / default FLOWCTL_ATTEMPT into component env. |
| internal/orchestrator/process.go | Adds FLOWCTL_HEARTBEAT_INTERVAL_MS injection for process-managed components. |
| internal/api/control_plane.go | Implements UpsertChunkRun, GetChunkRun, ListChunkRuns RPCs and wrapper methods. |
| internal/api/control_plane_test.go | Adds test coverage for chunk-run upsert/get/list behavior. |
| docs/component-flowctl-reporting.md | Documents component reporting contract, Go helper usage, and CLI inspection. |
| cmd/chunks.go | Adds flowctl chunks list/show operator commands with filters and formatted output. |
Files not reviewed (2)
- proto/control_plane.pb.go: Generated file
- proto/control_plane_grpc.pb.go: Generated file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| count := int32(0) | ||
| return b.ForEach(func(k, v []byte) error { | ||
| if limit > 0 && count >= limit { | ||
| return nil | ||
| } | ||
|
|
||
| var chunk ChunkRunInfo | ||
| if err := json.Unmarshal(v, &chunk); err != nil { | ||
| return fmt.Errorf("failed to unmarshal chunk run: %w", err) | ||
| } | ||
| if pipelineRunID != "" && chunk.Chunk.PipelineRunId != pipelineRunID { | ||
| return nil | ||
| } | ||
| if componentID != "" && chunk.Chunk.ComponentId != componentID { | ||
| return nil | ||
| } | ||
| if status != flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN && chunk.Chunk.Status != status { | ||
| return nil | ||
| } | ||
|
|
||
| chunks = append(chunks, &chunk) | ||
| count++ | ||
| return nil | ||
| }) |
| chunk := req.Chunk | ||
| if chunk.Attempt == 0 { | ||
| chunk.Attempt = 1 | ||
| } | ||
| if chunk.ChunkId == "" { | ||
| chunk.ChunkId = fmt.Sprintf("%s:%s:%d-%d:%d", chunk.PipelineRunId, chunk.ComponentId, chunk.ChunkStart, chunk.ChunkEnd, chunk.Attempt) | ||
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 50a2e534bd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| _, err := r.ReportChunk(ctx, ChunkUpdate{ | ||
| ChunkStart: start, | ||
| ChunkEnd: end, | ||
| Status: status, | ||
| CompletedAt: &now, | ||
| VerifiedAt: verifiedAt, | ||
| RowCounts: rowCounts, | ||
| Verification: verification, |
There was a problem hiding this comment.
Preserve prior chunk fields on completion
When callers follow the documented helper sequence (ReportChunkProgress and then ReportChunkCompleted for the same range), both calls get the same generated chunk id and the server upsert replaces the stored record. This completion update omits fields set by the progress update such as StartedAt, Phase, and Metadata, so the final historical chunk no longer shows when it started or what phase it reached; merge the previous chunk state or carry these fields forward on terminal updates.
Useful? React with 👍 / 👎.
| conn, err := grpc.DialContext(ctx, cfg.Endpoint, | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| grpc.WithBlock(), | ||
| ) |
There was a problem hiding this comment.
Bound reporter connection attempts
If a component passes a long-lived/root context to NewReporter (as in the new usage docs), grpc.WithBlock() waits until the control-plane endpoint connects or that context is cancelled. With a misconfigured or temporarily unavailable endpoint this hangs component startup indefinitely instead of failing fast, so the dial should use an internal timeout or avoid blocking unless the caller supplied a deadline.
Useful? React with 👍 / 👎.
| _, err := r.client.Heartbeat(ctx, &flowctlpb.ServiceHeartbeat{ | ||
| ServiceId: serviceID, | ||
| Metrics: metrics, |
There was a problem hiding this comment.
Wire reporter heartbeat metrics through the wrapper
These metrics are sent on the flowctlpb heartbeat path, but the registered ControlPlaneWrapper.Heartbeat currently converts the request into a v1 heartbeat with only ServiceId, so values from the new StartHeartbeatLoop (for example ledgers_processed) are silently dropped before the storage merge and never appear in status/monitoring. Convert the metric map in the wrapper or route the reporter through the v1 API.
Useful? React with 👍 / 👎.
This pull request introduces support for tracking and inspecting historical "chunk runs" in the control plane, which represent bounded work units (such as ledger backfill/repair ranges) processed by data-plane components. The changes include new storage and API methods, CLI commands for operator inspection, and documentation for component integration.
Chunk Run Management and API:
internal/storage/interface.goandinternal/storage/bolt.goto upsert, retrieve, list, and delete chunk runs, including error handling for not found cases. [1] [2] [3] [4] [5] [6]UpsertChunkRun,GetChunkRun, andListChunkRunsRPCs in the control plane server and wrapper (internal/api/control_plane.go). [1] [2]CLI and Operator Tools:
flowctl chunksCLI command withlistandshowsubcommands for inspecting chunk runs, including filtering and detailed output (cmd/chunks.go).Component and Pipeline Runner Integration:
FLOWCTL_RUN_IDandFLOWCTL_ATTEMPT(internal/orchestrator/process.go,internal/runner/pipeline_runner.go). [1] [2]Documentation and Testing:
docs/component-flowctl-reporting.md).internal/api/control_plane_test.go).These changes provide a foundation for reliable tracking and inspection of historical work units, improving both operator visibility and data-plane component integration.